Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
chore: Map external_source response to the nested output proto field
In order to appropriately organize the proto, ensuring the mapping of external API responses (whether gRPC or HTTP) to the nested fields in the output proto is efficient. Additionally, this approach, while maintaining compatibility with the prior method of associating external responses to predefined fields, avoids the need to introduce supplementary fields in the output proto.
  • Loading branch information
Shreyansh committed Aug 29, 2023
commit c214766bae6c6d90881fb7d77a8e8ad9a64243fe
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.gotocompany.dagger.core.utils.DescriptorsUtil;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -111,10 +112,14 @@ private void setField(String key, Object value, int fieldIndex) {
}

private void setFieldUsingType(String key, Object value, int fieldIndex) {
Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(key);
if (fieldDescriptor == null) {
IllegalArgumentException illegalArgumentException = new IllegalArgumentException("Field Descriptor not found for field: " + key);
reportAndThrowError(illegalArgumentException);
Descriptors.FieldDescriptor fieldDescriptor = null;
try {
fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, key);
if (fieldDescriptor == null) {
throw new IllegalArgumentException("Field Descriptor not found for field: " + key);
}
} catch (RuntimeException exception) {
reportAndThrowError(exception);
}
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor);
rowManager.setInOutput(fieldIndex, typeHandler.transformFromPostProcessor(value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.gotocompany.dagger.core.processors.common.OutputMapping;
import com.gotocompany.dagger.core.processors.common.PostResponseTelemetry;
import com.gotocompany.dagger.core.processors.common.RowManager;
import com.gotocompany.dagger.core.utils.DescriptorsUtil;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
Expand Down Expand Up @@ -157,10 +158,14 @@ private void setField(String key, Object value, int fieldIndex) {
}

private void setFieldUsingType(String key, Object value, Integer fieldIndex) {
Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(key);
if (fieldDescriptor == null) {
IllegalArgumentException illegalArgumentException = new IllegalArgumentException("Field Descriptor not found for field: " + key);
reportAndThrowError(illegalArgumentException);
Descriptors.FieldDescriptor fieldDescriptor = null;
try {
fieldDescriptor = DescriptorsUtil.getFieldDescriptor(descriptor, key);
if (fieldDescriptor == null) {
throw new IllegalArgumentException("Field Descriptor not found for field: " + key);
}
} catch (RuntimeException exception) {
reportAndThrowError(exception);
}
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor);
rowManager.setInOutput(fieldIndex, typeHandler.transformFromPostProcessor(value));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.gotocompany.dagger.core.utils;

import com.google.protobuf.Descriptors;

import java.util.Arrays;

/**
* Utility class that contains helper methods to get {@link Descriptors} {@link Descriptors.FieldDescriptor}.
*/
public class DescriptorsUtil {

/**
* Gets FieldDescriptor .
*
* @param descriptor the descriptor
* @param columnName the columnName
* @return the fieldDescriptor
*/
public static Descriptors.FieldDescriptor getFieldDescriptor(Descriptors.Descriptor descriptor, String columnName) {
String[] nestedFields = columnName.split("\\.");
if (nestedFields.length == 1) {
return descriptor.findFieldByName(columnName);
} else {
return getNestedFieldDescriptor(descriptor, nestedFields);
}
}

/**
* Gets FieldDescriptor .
*
* @param parentDescriptor the descriptor
* @param nestedColumnNames the array of columnNames
* @return the fieldDescriptor
*/
public static Descriptors.FieldDescriptor getNestedFieldDescriptor(Descriptors.Descriptor parentDescriptor, String[] nestedColumnNames) {
String childColumnName = nestedColumnNames[0];
if (nestedColumnNames.length == 1) {
return parentDescriptor.findFieldByName(childColumnName);
}
Descriptors.FieldDescriptor childFieldDescriptor = parentDescriptor.findFieldByName(childColumnName);
if (childFieldDescriptor == null) {
throw new IllegalArgumentException(String.format("Field Descriptor not found for field: %s in the proto of %s", childColumnName, parentDescriptor.getFullName()));
}
Descriptors.Descriptor childDescriptor = childFieldDescriptor.getMessageType();
return getNestedFieldDescriptor(childDescriptor, Arrays.copyOfRange(nestedColumnNames, 1, nestedColumnNames.length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,157 @@ public void shouldDetectProperComplexBodyAndHandleResponseIfRetainResponseTypeIs
verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData));
}

@Test
public void shouldDetectProperComplexBodyAndNestedProtoAndHandleResponseIfRetainResponseTypeIsFalse() throws InvalidProtocolBufferException {
descriptor = TestBookingLogMessage.getDescriptor();
outputMapping.put("driver_pickup_location.address", new OutputMapping("$.driver_pickup_location.address"));
outputMapping.put("driver_pickup_location.name", new OutputMapping("$.driver_pickup_location.name"));

TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build();
TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build();

outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name");
columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames);

grpcSourceConfig = new GrpcSourceConfigBuilder()
.setEndpoint("localhost")
.setServicePort(8000)
.setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest")
.setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse")
.setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod")
.setRequestPattern("{\"key\": \"%s\"}")
.setRequestVariables("customer_id")
.setOutputMapping(outputMapping)
.createGrpcSourceConfig();
grpcSourceConfig.setRetainResponseType(false);

DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray());
GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry());

Row resultStreamData = new Row(2);
Row outputData = new Row(2);
outputData.setField(0, "Indonesia");
outputData.setField(1, "GojekTech");
resultStreamData.setField(0, inputData);
resultStreamData.setField(1, outputData);

grpcResponseHandler.startTimer();
grpcResponseHandler.onNext(message);

verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE);
verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class));
verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData));
}

@Test
public void shouldDetectProperComplexBodyAndNestedProtoAndHandleResponseIfRetainResponseTypeIsTrue() throws InvalidProtocolBufferException {
descriptor = TestBookingLogMessage.getDescriptor();
outputMapping.put("driver_pickup_location.address", new OutputMapping("$.driver_pickup_location.address"));
outputMapping.put("driver_pickup_location.name", new OutputMapping("$.driver_pickup_location.name"));

TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build();
TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build();

outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name");
columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames);

grpcSourceConfig = new GrpcSourceConfigBuilder()
.setEndpoint("localhost")
.setServicePort(8000)
.setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest")
.setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse")
.setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod")
.setRequestPattern("{\"key\": \"%s\"}")
.setRequestVariables("customer_id")
.setOutputMapping(outputMapping)
.createGrpcSourceConfig();
grpcSourceConfig.setRetainResponseType(true);

DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray());
GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry());

Row resultStreamData = new Row(2);
Row outputData = new Row(2);
outputData.setField(0, "Indonesia");
outputData.setField(1, "GojekTech");
resultStreamData.setField(0, inputData);
resultStreamData.setField(1, outputData);

grpcResponseHandler.startTimer();
grpcResponseHandler.onNext(message);

verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.SUCCESS_RESPONSE);
verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class));
verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData));
}

@Test
public void shouldThrowErrorWhenNestedFieldIsNotPresentInOutputDescriptor() throws InvalidProtocolBufferException {
descriptor = TestBookingLogMessage.getDescriptor();
outputMapping.put("driver_pickup_location.invalid_address", new OutputMapping("$.driver_pickup_location.address"));
TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build();
TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build();

grpcSourceConfig = new GrpcSourceConfigBuilder().setEndpoint("localhost").setServicePort(8000).setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest").setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse").setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod").setRequestPattern("{\"key\": \"%s\"}").setRequestVariables("customer_id").setOutputMapping(outputMapping).createGrpcSourceConfig();
grpcSourceConfig.setRetainResponseType(false);

outputColumnNames = Arrays.asList("driver_pickup_location.address", "driver_pickup_location.name");
columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames);

DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray());
GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry());

Row resultStreamData = new Row(2);
Row outputData = new Row(2);
outputData.setField(0, "Indonesia");
outputData.setField(1, "GojekTech");
resultStreamData.setField(0, inputData);
resultStreamData.setField(1, outputData);

grpcResponseHandler.startTimer();
assertThrows(Exception.class, () -> grpcResponseHandler.onNext(message));

ArgumentCaptor<IllegalArgumentException> exceptionCaptor = ArgumentCaptor.forClass(IllegalArgumentException.class);
verify(errorReporter, times(1)).reportFatalException(exceptionCaptor.capture());
assertEquals("Field Descriptor not found for field: driver_pickup_location.invalid_address", exceptionCaptor.getValue().getMessage());

ArgumentCaptor<IllegalArgumentException> exceptionCaptor2 = ArgumentCaptor.forClass(IllegalArgumentException.class);
verify(resultFuture, times(1)).completeExceptionally(exceptionCaptor2.capture());
assertEquals("Field Descriptor not found for field: driver_pickup_location.invalid_address", exceptionCaptor2.getValue().getMessage());
}
@Test
public void shouldThrowErrorWhenNestedParentFieldIsNotPresentInOutputDescriptor() throws InvalidProtocolBufferException {
descriptor = TestBookingLogMessage.getDescriptor();
outputMapping.put("driver-pickup-location.address", new OutputMapping("$.driver_pickup_location.address"));
TestLocation location = TestLocation.newBuilder().setAddress("Indonesia").setName("GojekTech").build();
TestBookingLogMessage bookingLogMessage = TestBookingLogMessage.newBuilder().setDriverPickupLocation(location).setCustomerId("123456").build();

grpcSourceConfig = new GrpcSourceConfigBuilder().setEndpoint("localhost").setServicePort(8000).setGrpcRequestProtoSchema("com.gotocompany.dagger.consumer.TestGrpcRequest").setGrpcResponseProtoSchema("com.gotocompany.dagger.consumer.TestGrpcResponse").setGrpcMethodUrl("com.gotocompany.dagger.consumer.test/TestMethod").setRequestPattern("{\"key\": \"%s\"}").setRequestVariables("customer_id").setOutputMapping(outputMapping).createGrpcSourceConfig();
grpcSourceConfig.setRetainResponseType(false);

outputColumnNames = Arrays.asList("driver-pickup-location.address");
columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames);

DynamicMessage message = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), bookingLogMessage.toByteArray());
GrpcResponseHandler grpcResponseHandler = new GrpcResponseHandler(grpcSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry());

Row resultStreamData = new Row(2);
Row outputData = new Row(2);
outputData.setField(0, "Indonesia");
outputData.setField(1, "GojekTech");
resultStreamData.setField(0, inputData);
resultStreamData.setField(1, outputData);

grpcResponseHandler.startTimer();
assertThrows(Exception.class, () -> grpcResponseHandler.onNext(message));

ArgumentCaptor<IllegalArgumentException> exceptionCaptor = ArgumentCaptor.forClass(IllegalArgumentException.class);
verify(errorReporter, times(1)).reportFatalException(exceptionCaptor.capture());
assertEquals("Field Descriptor not found for field: driver-pickup-location in the proto of com.gotocompany.dagger.consumer.TestBookingLogMessage", exceptionCaptor.getValue().getMessage());

ArgumentCaptor<IllegalArgumentException> exceptionCaptor2 = ArgumentCaptor.forClass(IllegalArgumentException.class);
verify(resultFuture, times(1)).completeExceptionally(exceptionCaptor2.capture());
assertEquals("Field Descriptor not found for field: driver-pickup-location in the proto of com.gotocompany.dagger.consumer.TestBookingLogMessage", exceptionCaptor2.getValue().getMessage());
}

}
Loading