Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.gotocompany.dagger.core.utils.DescriptorsUtil;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -111,10 +112,14 @@ private void setField(String key, Object value, int fieldIndex) {
}

private void setFieldUsingType(String key, Object value, int fieldIndex) {
Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(key);
if (fieldDescriptor == null) {
IllegalArgumentException illegalArgumentException = new IllegalArgumentException("Field Descriptor not found for field: " + key);
reportAndThrowError(illegalArgumentException);
Descriptors.FieldDescriptor fieldDescriptor = null;
try {
fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, key);
if (fieldDescriptor == null) {
throw new IllegalArgumentException("Field Descriptor not found for field: " + key);
}
} catch (RuntimeException exception) {
reportAndThrowError(exception);
}
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor);
rowManager.setInOutput(fieldIndex, typeHandler.transformFromPostProcessor(value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.gotocompany.dagger.core.processors.common.OutputMapping;
import com.gotocompany.dagger.core.processors.common.PostResponseTelemetry;
import com.gotocompany.dagger.core.processors.common.RowManager;
import com.gotocompany.dagger.core.utils.DescriptorsUtil;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
Expand Down Expand Up @@ -157,10 +158,14 @@ private void setField(String key, Object value, int fieldIndex) {
}

private void setFieldUsingType(String key, Object value, Integer fieldIndex) {
Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(key);
if (fieldDescriptor == null) {
IllegalArgumentException illegalArgumentException = new IllegalArgumentException("Field Descriptor not found for field: " + key);
reportAndThrowError(illegalArgumentException);
Descriptors.FieldDescriptor fieldDescriptor = null;
try {
fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, key);
if (fieldDescriptor == null) {
throw new IllegalArgumentException("Field Descriptor not found for field: " + key);
}
} catch (RuntimeException exception) {
reportAndThrowError(exception);
}
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor);
rowManager.setInOutput(fieldIndex, typeHandler.transformFromPostProcessor(value));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.gotocompany.dagger.core.utils;

import com.google.protobuf.Descriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType.MESSAGE;

/**
* Utility class that contains helper methods to get {@link Descriptors} {@link Descriptors.FieldDescriptor}.
*/
public class DescriptorsUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(DescriptorsUtil.class.getName());

/**
* Gets FieldDescriptor .
*
* @param descriptor the descriptor
* @param columnName the columnName
* @return the fieldDescriptor
*/
public static Descriptors.FieldDescriptor getFieldDescriptor(Descriptors.Descriptor descriptor, String columnName) {
if (descriptor == null || columnName == null) {
return null;
}
String[] nestedFields = columnName.split("\\.");
if (nestedFields.length == 1) {
return descriptor.findFieldByName(columnName);
} else {
return getNestedFieldDescriptor(descriptor, nestedFields);
}
}

/**
* Gets FieldDescriptor .
*
* @param parentDescriptor the descriptor
* @param nestedColumnNames the array of columnNames
* @return the fieldDescriptor
*/
public static Descriptors.FieldDescriptor getNestedFieldDescriptor(Descriptors.Descriptor parentDescriptor, String[] nestedColumnNames) {
if (parentDescriptor == null || nestedColumnNames == null || nestedColumnNames.length == 0) {
return null;
}
String childColumnName = nestedColumnNames[0];
if (nestedColumnNames.length == 1) {
return parentDescriptor.findFieldByName(childColumnName);
}
Descriptors.FieldDescriptor childFieldDescriptor = parentDescriptor.findFieldByName(childColumnName);
if (childFieldDescriptor == null || childFieldDescriptor.getJavaType() != MESSAGE) {
LOGGER.info(String.format("Either the Field Descriptor for the field '%s' is missing in the proto '%s', or the Field Descriptor is not of the MESSAGE type.", childColumnName, parentDescriptor.getFullName()));
return null;
}
Descriptors.Descriptor childDescriptor = childFieldDescriptor.getMessageType();
return getNestedFieldDescriptor(childDescriptor, Arrays.copyOfRange(nestedColumnNames, 1, nestedColumnNames.length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,122 @@ public void shouldDetectProperComplexBodyAndHandleResponseIfRetainResponseTypeIs
verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData));
}

@Test
public void shouldDetectProperComplexBodyAndNestedProtoAndHandleResponseIfRetainResponseTypeIsFalse() throws InvalidProtocolBufferException {
descriptor = TestBookingLogMessage.getDescriptor();
outputMapping.put("driver_pickup_location.address", new OutputMapping("$.driver_pickup_location.address"));
outputMapping.put("driver_pickup_location.name", new OutputMapping("$.driver_pickup_location.name"));

TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build();
TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build();

outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name");
columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames);

grpcSourceConfig = new GrpcSourceConfigBuilder()
.setEndpoint("localhost")
.setServicePort(8000)
.setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest")
.setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse")
.setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod")
.setRequestPattern("{\"key\": \"%s\"}")
.setRequestVariables("customer_id")
.setOutputMapping(outputMapping)
.createGrpcSourceConfig();
grpcSourceConfig.setRetainResponseType(false);

DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray());
GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry());

Row resultStreamData = new Row(2);
Row outputData = new Row(2);
outputData.setField(0, "Indonesia");
outputData.setField(1, "GojekTech");
resultStreamData.setField(0, inputData);
resultStreamData.setField(1, outputData);

grpcResponseHandler.startTimer();
grpcResponseHandler.onNext(message);

verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE);
verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class));
verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData));
}

@Test
public void shouldDetectProperComplexBodyAndNestedProtoAndHandleResponseIfRetainResponseTypeIsTrue() throws InvalidProtocolBufferException {
descriptor = TestBookingLogMessage.getDescriptor();
outputMapping.put("driver_pickup_location.address", new OutputMapping("$.driver_pickup_location.address"));
outputMapping.put("driver_pickup_location.name", new OutputMapping("$.driver_pickup_location.name"));

TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build();
TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build();

outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name");
columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames);

grpcSourceConfig = new GrpcSourceConfigBuilder()
.setEndpoint("localhost")
.setServicePort(8000)
.setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest")
.setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse")
.setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod")
.setRequestPattern("{\"key\": \"%s\"}")
.setRequestVariables("customer_id")
.setOutputMapping(outputMapping)
.createGrpcSourceConfig();
grpcSourceConfig.setRetainResponseType(true);

DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray());
GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry());

Row resultStreamData = new Row(2);
Row outputData = new Row(2);
outputData.setField(0, "Indonesia");
outputData.setField(1, "GojekTech");
resultStreamData.setField(0, inputData);
resultStreamData.setField(1, outputData);

grpcResponseHandler.startTimer();
grpcResponseHandler.onNext(message);

verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE);
verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class));
verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData));
}

@Test
public void shouldThrowErrorWhenNestedFieldIsNotPresentInOutputDescriptor() throws InvalidProtocolBufferException {
descriptor = TestBookingLogMessage.getDescriptor();
outputMapping.put("driver_pickup_location.invalid_address", new OutputMapping("$.driver_pickup_location.address"));
TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build();
TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build();

grpcSourceConfig = new GrpcSourceConfigBuilder().setEndpoint("localhost").setServicePort(8000).setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest").setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse").setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod").setRequestPattern("{\"key\": \"%s\"}").setRequestVariables("customer_id").setOutputMapping(outputMapping).createGrpcSourceConfig();
grpcSourceConfig.setRetainResponseType(false);

outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name");
columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames);

DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray());
GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry());

Row resultStreamData = new Row(2);
Row outputData = new Row(2);
outputData.setField(0, "Indonesia");
outputData.setField(1, "GojekTech");
resultStreamData.setField(0, inputData);
resultStreamData.setField(1, outputData);

grpcResponseHandler.startTimer();
assertThrows(Exception.class, () -> grpcResponseHandler.onNext(message));

ArgumentCaptor<IllegalArgumentException> exceptionCaptor = ArgumentCaptor.forClass(IllegalArgumentException.class);
verify(errorReporter, times(1)).reportFatalException(exceptionCaptor.capture());
assertEquals("Field Descriptor not found for field: driver_pickup_location.invalid_address", exceptionCaptor.getValue().getMessage());

ArgumentCaptor<IllegalArgumentException> exceptionCaptor2 = ArgumentCaptor.forClass(IllegalArgumentException.class);
verify(resultFuture, times(1)).completeExceptionally(exceptionCaptor2.capture());
assertEquals("Field Descriptor not found for field: driver_pickup_location.invalid_address", exceptionCaptor2.getValue().getMessage());
}
}
Loading