Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions stub/src/main/java/io/grpc/stub/ServerCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ private static final class ServerCallStreamObserverImpl<ReqT, RespT>
private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
private RespT unaryResponse;

// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call) {
Expand Down Expand Up @@ -348,7 +349,13 @@ public void onNext(RespT response) {
call.sendHeaders(new Metadata());
sentHeaders = true;
}
call.sendMessage(response);

if (!call.getMethodDescriptor().getType().serverSendsOneMessage()) {
call.sendMessage(response);
} else {
// delay the sendMessage() to onComplete()/onError()
unaryResponse = response;
Copy link
Contributor

@ST-DDT ST-DDT Oct 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this should throw an exception if the server sends multiple responses/overwrites the previous one.

}
}

@Override
Expand All @@ -368,8 +375,28 @@ public void onCompleted() {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
}
} else {
call.close(Status.OK, new Metadata());
completed = true;

if (call.getMethodDescriptor().getType().serverSendsOneMessage()) {
Throwable error = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error should be logged to the server log.

if (unaryResponse != null) {
try {
call.sendMessage(unaryResponse);
} catch (Throwable t) {
error = t;
}
} else {
error = Status.INTERNAL.withDescription("Response message is null for unary call")
.asRuntimeException();
}
if (error != null) {
onError(error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error is sent to the client. We should not send either of the two possible error values, as we don't want to send exceptions to clients for the application (it's bad practice that we don't want to encourage) and the other message doesn't make sense to the client.

What we should do instead is slightly unclear, but we can talk about it offline.

}
}

if (!aborted) {
call.close(Status.OK, new Metadata());
completed = true;
}
}
}

Expand Down
75 changes: 75 additions & 0 deletions stub/src/test/java/io/grpc/stub/ServerCallsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.grpc.ServerServiceDefinition;
import io.grpc.ServiceDescriptor;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
Expand Down Expand Up @@ -439,6 +440,80 @@ public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
listener.onHalfClose();
}

@Test
public void serverUnaryMultipleMsgGetErrorWithoutMsg() {

ServerCallHandler<Integer, Integer> serverCallHandler =
ServerCalls.asyncUnaryCall(
new ServerCalls.UnaryMethod<Integer, Integer>() {
@Override
public void invoke(Integer request, StreamObserver<Integer> responseObserver) {
responseObserver.onNext(request);
responseObserver.onCompleted();
}
});
ServerCall.Listener<Integer> callListener =
serverCallHandler.startCall(serverCall, new Metadata());
serverCall.isReady = true;
serverCall.isCancelled = false;
callListener.onReady();
callListener.onMessage(1);
callListener.onMessage(1);
callListener.onHalfClose();
assertThat(serverCall.status.getCode()).isEqualTo(Code.INTERNAL);
assertThat(serverCall.status.getDescription())
.isEqualTo("Too many requests");
assertThat(serverCall.responses).isEmpty();
}

@Test
public void serverUnaryNullMsgGetErrorWithoutMsg() {

ServerCallHandler<Integer, Integer> serverCallHandler =
ServerCalls.asyncUnaryCall(
new ServerCalls.UnaryMethod<Integer, Integer>() {
@Override
public void invoke(Integer request, StreamObserver<Integer> responseObserver) {
responseObserver.onNext(null);
responseObserver.onCompleted();
}
});
ServerCall.Listener<Integer> callListener =
serverCallHandler.startCall(serverCall, new Metadata());
serverCall.isReady = true;
serverCall.isCancelled = false;
callListener.onReady();
callListener.onMessage(1);
callListener.onHalfClose();
assertThat(serverCall.status.getCode()).isEqualTo(Code.INTERNAL);
assertThat(serverCall.status.getDescription())
.isEqualTo("Response message is null for unary call");
assertThat(serverCall.responses).isEmpty();
}

@Test
public void serverUnaryMsgGetOkWithMsg() {

ServerCallHandler<Integer, Integer> serverCallHandler =
ServerCalls.asyncUnaryCall(
new ServerCalls.UnaryMethod<Integer, Integer>() {
@Override
public void invoke(Integer request, StreamObserver<Integer> responseObserver) {
responseObserver.onNext(request);
responseObserver.onCompleted();
}
});
ServerCall.Listener<Integer> callListener =
serverCallHandler.startCall(serverCall, new Metadata());
serverCall.isReady = true;
serverCall.isCancelled = false;
callListener.onReady();
callListener.onMessage(132);
callListener.onHalfClose();
assertThat(serverCall.status.getCode()).isEqualTo(Code.OK);
assertThat(serverCall.responses).containsExactly(132);
}

@Test
public void inprocessTransportManualFlow() throws Exception {
final Semaphore semaphore = new Semaphore(1);
Expand Down