Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ subprojects {
protobuf_plugin: 'com.google.protobuf:protobuf-gradle-plugin:0.8.0',
protobuf_util: "com.google.protobuf:protobuf-java-util:${protobufVersion}",

netty: 'io.netty:netty-codec-http2:[4.1.4.Final]',
netty: 'io.netty:netty-codec-http2:[4.1.6.Final-SNAPSHOT]',
netty_epoll: 'io.netty:netty-transport-native-epoll:4.1.4.Final' + epoll_suffix,
netty_tcnative: 'io.netty:netty-tcnative-boringssl-static:1.1.33.Fork19',
netty_tcnative: 'io.netty:netty-tcnative-boringssl-static:1.1.33.Fork22',

// Test dependencies.
junit: 'junit:junit:4.11',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ protected TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx) {
super(maxMessageSize, statsTraceCtx);
}

public boolean isClosed() {
return listenerClosed;
}

@VisibleForTesting
public final void setListener(ClientStreamListener listener) {
Preconditions.checkState(this.listener == null, "Already called setListener");
Expand Down
125 changes: 54 additions & 71 deletions netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,95 +33,69 @@

import static io.netty.buffer.Unpooled.directBuffer;
import static io.netty.buffer.Unpooled.unreleasableBuffer;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
import static io.netty.handler.codec.http2.Http2Stream2.CONNECTION_STREAM;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.common.annotations.VisibleForTesting;

import io.grpc.Attributes;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.DefaultHttp2PingFrame;
import io.netty.handler.codec.http2.DefaultHttp2SettingsFrame;
import io.netty.handler.codec.http2.DefaultHttp2WindowUpdateFrame;
import io.netty.handler.codec.http2.Http2ChannelDuplexHandler;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream;

import java.util.concurrent.TimeUnit;

/**
* Base class for all Netty gRPC handlers. This class standardizes exception handling (always
* shutdown the connection) as well as sending the initial connection window at startup.
*/
abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
private static long GRACEFUL_SHUTDOWN_TIMEOUT = SECONDS.toMillis(5);
private boolean autoTuneFlowControlOn = false;
private int initialConnectionWindow;
abstract class AbstractNettyHandler extends Http2ChannelDuplexHandler {
private static final long GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS = 5;
private boolean autoTuneFlowControlOn;
private ChannelHandlerContext ctx;
private final FlowControlPinger flowControlPing = new FlowControlPinger();
private final FlowControlPinger flowControlPing;
private final Http2FrameCodec frameCodec;

private static final int BDP_MEASUREMENT_PING = 1234;
private static final ByteBuf payloadBuf =
unreleasableBuffer(directBuffer(8).writeLong(BDP_MEASUREMENT_PING));

AbstractNettyHandler(Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);

// Set the timeout for graceful shutdown.
gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_TIMEOUT);

// Extract the connection window from the settings if it was set.
this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
initialSettings.initialWindowSize();
AbstractNettyHandler(Http2FrameCodecBuilder frameCodecBuilder, int initialConnectionWindow) {
frameCodecBuilder.gracefulShutdownTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS, SECONDS);
frameCodec = frameCodecBuilder.build();
flowControlPing = new FlowControlPinger(initialConnectionWindow);
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
// Sends the connection preface if we haven't already.
ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, frameCodec);
super.handlerAdded(ctx);
sendInitialConnectionWindow();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Sends connection preface if we haven't already.
super.channelActive(ctx);
sendInitialConnectionWindow();
/**
* Triggered on protocol negotiation completion.
*
* <p>It must me called after negotiation is completed but before given handler is added to the
* channel.
*
* @param attrs arbitrary attributes passed after protocol negotiation (eg. SSLSession).
*/
public void handleProtocolNegotiationCompleted(Attributes attrs) {
}

@Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Http2Exception embedded = getEmbeddedHttp2Exception(cause);
if (embedded == null) {
// There was no embedded Http2Exception, assume it's a connection error. Subclasses are
// responsible for storing the appropriate status and shutting down the connection.
onError(ctx, cause);
} else {
super.exceptionCaught(ctx, cause);
}
Http2FrameCodec frameCodec() {
return frameCodec;
}

protected final ChannelHandlerContext ctx() {
return ctx;
}

/**
* Sends initial connection window to the remote endpoint if necessary.
*/
private void sendInitialConnectionWindow() throws Http2Exception {
if (ctx.channel().isActive() && initialConnectionWindow > 0) {
Http2Stream connectionStream = connection().connectionStream();
int currentSize = connection().local().flowController().windowSize(connectionStream);
int delta = initialConnectionWindow - currentSize;
decoder().flowController().incrementWindowSize(connectionStream, delta);
initialConnectionWindow = -1;
ctx.flush();
}
}

@VisibleForTesting
FlowControlPinger flowControlPing() {
return flowControlPing;
Expand All @@ -145,6 +119,12 @@ final class FlowControlPinger {
private float lastBandwidth; // bytes per second
private long lastPingTime;

private int initialConnectionWindow;

FlowControlPinger(int initialConnectionWindow) {
this.initialConnectionWindow = initialConnectionWindow;
}

public int payload() {
return BDP_MEASUREMENT_PING;
}
Expand All @@ -159,36 +139,35 @@ public void onDataRead(int dataLength, int paddingLength) {
}
if (!isPinging()) {
setPinging(true);
sendPing(ctx());
sendPing();
}
incrementDataSincePing(dataLength + paddingLength);
}

public void updateWindow() throws Http2Exception {
public void updateWindow() {
if (!autoTuneFlowControlOn) {
return;
}
pingReturn++;
long elapsedTime = (System.nanoTime() - lastPingTime);
long elapsedTime = System.nanoTime() - lastPingTime;
if (elapsedTime == 0) {
elapsedTime = 1;
}
long bandwidth = (getDataSincePing() * TimeUnit.SECONDS.toNanos(1)) / elapsedTime;
Http2LocalFlowController fc = decoder().flowController();
long bandwidth = getDataSincePing() * SECONDS.toNanos(1) / elapsedTime;
// Calculate new window size by doubling the observed BDP, but cap at max window
int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE);
setPinging(false);
int currentWindow = fc.initialWindowSize(connection().connectionStream());
if (targetWindow > currentWindow && bandwidth > lastBandwidth) {
if (targetWindow > initialConnectionWindow && bandwidth > lastBandwidth) {
lastBandwidth = bandwidth;
int increase = targetWindow - currentWindow;
fc.incrementWindowSize(connection().connectionStream(), increase);
fc.initialWindowSize(targetWindow);
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(targetWindow);
frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
}
int increase = targetWindow - initialConnectionWindow;

ctx().write(new DefaultHttp2WindowUpdateFrame(increase).stream(CONNECTION_STREAM));
Http2Settings updateInitialWindowSize = new Http2Settings().initialWindowSize(targetWindow);
ctx().write(new DefaultHttp2SettingsFrame(updateInitialWindowSize));

// TODO(buchgr): Only set this on settings ack?
initialConnectionWindow = targetWindow;
}
}

private boolean isPinging() {
Expand All @@ -199,10 +178,10 @@ private void setPinging(boolean pingOut) {
pinging = pingOut;
}

private void sendPing(ChannelHandlerContext ctx) {
private void sendPing() {
setDataSizeSincePing(0);
lastPingTime = System.nanoTime();
encoder().writePing(ctx, false, payloadBuf.slice(), ctx.newPromise());
ctx().write(new DefaultHttp2PingFrame(payloadBuf.slice()));
pingCount++;
}

Expand All @@ -211,6 +190,10 @@ private void incrementDataSincePing(int increase) {
setDataSizeSincePing(currentSize + increase);
}

int initialConnectionWindow() {
return initialConnectionWindow;
}

@VisibleForTesting
int getPingCount() {
return pingCount;
Expand Down
18 changes: 10 additions & 8 deletions netty/src/main/java/io/grpc/netty/CancelServerStreamCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,23 @@
import com.google.common.base.Preconditions;

import io.grpc.Status;
import io.netty.handler.codec.http2.Http2Stream2;

/**
* Command sent from a Netty server stream to the handler to cancel the stream.
*/
class CancelServerStreamCommand extends WriteQueue.AbstractQueuedCommand {
private final NettyServerStream.TransportState stream;
private final Http2Stream2 http2Stream;
private final Status reason;

CancelServerStreamCommand(NettyServerStream.TransportState stream, Status reason) {
this.stream = Preconditions.checkNotNull(stream, "stream");
CancelServerStreamCommand(Http2Stream2 http2Stream, Status reason) {
this.http2Stream = Preconditions.checkNotNull(http2Stream, "http2Stream");
this.reason = Preconditions.checkNotNull(reason, "reason");
}

NettyServerStream.TransportState stream() {
return stream;
@SuppressWarnings("unchecked")
Http2Stream2 http2Stream() {
return http2Stream;
}

Status reason() {
Expand All @@ -68,19 +70,19 @@ public boolean equals(Object o) {

CancelServerStreamCommand that = (CancelServerStreamCommand) o;

return Objects.equal(this.stream, that.stream)
return Objects.equal(this.http2Stream, that.http2Stream)
&& Objects.equal(this.reason, that.reason);
}

@Override
public int hashCode() {
return Objects.hashCode(stream, reason);
return Objects.hashCode(http2Stream, reason);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("stream", stream)
.add("stream", http2Stream)
.add("reason", reason)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ final class ClientTransportLifecycleManager {
private final ManagedClientTransport.Listener listener;
private boolean transportReady;
private boolean transportShutdown;
private boolean transportInUse;
private int transportUsers;
/** null iff !transportShutdown. */
private Status shutdownStatus;
/** null iff !transportShutdown. */
Expand Down Expand Up @@ -68,12 +68,23 @@ public void notifyShutdown(Status s) {
listener.transportShutdown(s);
}

public void notifyInUse(boolean inUse) {
if (inUse == transportInUse) {
return;
public void notifyNewUser() {
transportUsers++;
if (transportUsers == 1) {
listener.transportInUse(true);
}
}

public void notifyLostUser() {
transportUsers--;

if (transportUsers < 0) {
throw new AssertionError();
}

if (transportUsers == 0) {
listener.transportInUse(false);
}
transportInUse = inUse;
listener.transportInUse(inUse);
}

public void notifyTerminated(Status s) {
Expand Down
Loading