Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
review feedback
  • Loading branch information
squito committed May 31, 2018
commit 83c3271d2f45bbef18d865bddbc6807e9fbd2503
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public void channelInactive() throws Exception {

private void deactivateStream() {
if (handler instanceof TransportResponseHandler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need to do this for TransportRequestHandler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only purpose of TransportResponseHandler.deactivateStream() is to include the stream request in the count for numOutstandingRequests (its not doing any critical cleanup). I will include a comment here explaining that.

// we only have to do this for TransportResponseHandler as it exposes numOutstandingFetches
// (there is no extra cleanup that needs to happen)
((TransportResponseHandler) handler).deactivateStream();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static StreamResponse decode(ByteBuf buf) {

@Override
public int hashCode() {
return Objects.hashCode(byteCount, streamId, body());
return Objects.hashCode(byteCount, streamId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static UploadStream decode(ByteBuf buf) {

@Override
public int hashCode() {
return Objects.hashCode(requestId, body());
return Long.hashCode(requestId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,17 @@ public class StreamData {
private final TransportRequestHandler handler;
private final TransportFrameDecoder frameDecoder;
private final RpcResponseCallback rpcCallback;
private final ByteBuffer meta;
private final long streamByteCount;
private boolean hasCallback = false;

public StreamData(
TransportRequestHandler handler,
TransportFrameDecoder frameDecoder,
RpcResponseCallback rpcCallback,
ByteBuffer meta,
long streamByteCount) {
this.handler = handler;
this.frameDecoder = frameDecoder;
this.rpcCallback = rpcCallback;
this.meta = meta;
this.streamByteCount = streamByteCount;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void onFailure(Throwable e) {
channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
ByteBuffer meta = req.meta.nioByteBuffer();
StreamData streamData = new StreamData(TransportRequestHandler.this, frameDecoder,
callback, meta, req.bodyByteCount);
callback, req.bodyByteCount);
rpcHandler.receive(reverseClient, meta, streamData, callback);
if (!streamData.hasCallback()) {
throw new RuntimeException("Destination did not register stream handler");
Expand Down