@@ -141,6 +141,7 @@ private enum ReadStatus {
141141 private Runnable fireChannelWritabilityChangedTask ;
142142
143143 private boolean outboundClosed ;
144+ private int flowControlledBytes ;
144145
145146 /**
146147 * This variable represents if a read is in progress for the current channel or was requested.
@@ -154,13 +155,7 @@ private enum ReadStatus {
154155
155156 /** {@code true} after the first HEADERS frame has been written **/
156157 private boolean firstFrameWritten ;
157-
158- // Currently the child channel and parent channel are always on the same EventLoop thread. This allows us to
159- // extend the read loop of a child channel if the child channel drains its queued data during read, and the
160- // parent channel is still in its read loop. The next/previous links build a doubly linked list that the parent
161- // channel will iterate in its channelReadComplete to end the read cycle for each child channel in the list.
162- AbstractHttp2StreamChannel next ;
163- AbstractHttp2StreamChannel previous ;
158+ private boolean readCompletePending ;
164159
165160 AbstractHttp2StreamChannel (DefaultHttp2FrameStream stream , int id , ChannelHandler inboundHandler ) {
166161 this .stream = stream ;
@@ -535,16 +530,18 @@ void fireChildRead(Http2Frame frame) {
535530 // otherwise we would have drained it from the queue and processed it during the read cycle.
536531 assert inboundBuffer == null || inboundBuffer .isEmpty ();
537532 final RecvByteBufAllocator .Handle allocHandle = unsafe .recvBufAllocHandle ();
538- unsafe .doRead0 (frame , allocHandle );
533+ flowControlledBytes += unsafe .doRead0 (frame , allocHandle );
539534 // We currently don't need to check for readEOS because the parent channel and child channel are limited
540535 // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
541536 // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
542537 // cost of additional readComplete notifications on the rare path.
543538 if (allocHandle .continueReading ()) {
544- tryAddChildChannelToReadPendingQueue ();
539+ if (!readCompletePending ) {
540+ readCompletePending = true ;
541+ addChannelToReadCompletePendingQueue ();
542+ }
545543 } else {
546- tryRemoveChildChannelFromReadPendingQueue ();
547- unsafe .notifyReadComplete (allocHandle );
544+ unsafe .notifyReadComplete (allocHandle , true );
548545 }
549546 } else {
550547 if (inboundBuffer == null ) {
@@ -556,8 +553,8 @@ void fireChildRead(Http2Frame frame) {
556553
557554 void fireChildReadComplete () {
558555 assert eventLoop ().inEventLoop ();
559- assert readStatus != ReadStatus .IDLE ;
560- unsafe .notifyReadComplete (unsafe .recvBufAllocHandle ());
556+ assert readStatus != ReadStatus .IDLE || ! readCompletePending ;
557+ unsafe .notifyReadComplete (unsafe .recvBufAllocHandle (), false );
561558 }
562559
563560 private final class Http2ChannelUnsafe implements Unsafe {
@@ -651,14 +648,16 @@ public void operationComplete(ChannelFuture future) {
651648 return ;
652649 }
653650 closeInitiated = true ;
654-
655- tryRemoveChildChannelFromReadPendingQueue () ;
651+ // Just set to false as removing from an underlying queue would even be more expensive.
652+ readCompletePending = false ;
656653
657654 final boolean wasActive = isActive ();
658655
659- // Only ever send a reset frame if the connection is still alive and if the stream may have existed
656+ updateLocalWindowIfNeeded ();
657+
658+ // Only ever send a reset frame if the connection is still alive and if the stream was created before
660659 // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
661- if (parent ().isActive () && !readEOS && streamMayHaveExisted (stream ())) {
660+ if (parent ().isActive () && !readEOS && Http2CodecUtil . isStreamIdValid (stream . id ())) {
662661 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame (Http2Error .CANCEL ).stream (stream ());
663662 write (resetFrame , unsafe ().voidPromise ());
664663 flush ();
@@ -782,7 +781,7 @@ void doBeginRead() {
782781 allocHandle .reset (config ());
783782 boolean continueReading = false ;
784783 do {
785- doRead0 ((Http2Frame ) message , allocHandle );
784+ flowControlledBytes += doRead0 ((Http2Frame ) message , allocHandle );
786785 } while ((readEOS || (continueReading = allocHandle .continueReading ())) &&
787786 (message = inboundBuffer .poll ()) != null );
788787
@@ -791,10 +790,12 @@ void doBeginRead() {
791790 // currently reading it is possile that more frames will be delivered to this child channel. In
792791 // the case that this child channel still wants to read we delay the channelReadComplete on this
793792 // child channel until the parent is done reading.
794- boolean added = tryAddChildChannelToReadPendingQueue ();
795- assert added ;
793+ if (!readCompletePending ) {
794+ readCompletePending = true ;
795+ addChannelToReadCompletePendingQueue ();
796+ }
796797 } else {
797- notifyReadComplete (allocHandle );
798+ notifyReadComplete (allocHandle , true );
798799 }
799800 }
800801 }
@@ -803,13 +804,30 @@ void readEOS() {
803804 readEOS = true ;
804805 }
805806
806- void notifyReadComplete (RecvByteBufAllocator .Handle allocHandle ) {
807- assert next == null && previous == null ;
807+ private void updateLocalWindowIfNeeded () {
808+ if (flowControlledBytes != 0 ) {
809+ int bytes = flowControlledBytes ;
810+ flowControlledBytes = 0 ;
811+ write0 (parentContext (), new DefaultHttp2WindowUpdateFrame (bytes ).stream (stream ));
812+ writeDoneAndNoFlush = true ;
813+ }
814+ }
815+
816+ void notifyReadComplete (RecvByteBufAllocator .Handle allocHandle , boolean forceReadComplete ) {
817+ if (!readCompletePending && !forceReadComplete ) {
818+ return ;
819+ }
820+ // Set to false just in case we added the channel multiple times before.
821+ readCompletePending = false ;
822+
808823 if (readStatus == ReadStatus .REQUESTED ) {
809824 readStatus = ReadStatus .IN_PROGRESS ;
810825 } else {
811826 readStatus = ReadStatus .IDLE ;
812827 }
828+
829+ updateLocalWindowIfNeeded ();
830+
813831 allocHandle .readComplete ();
814832 pipeline ().fireChannelReadComplete ();
815833 // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
@@ -822,29 +840,20 @@ void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle) {
822840 }
823841
824842 @ SuppressWarnings ("deprecation" )
825- void doRead0 (Http2Frame frame , RecvByteBufAllocator .Handle allocHandle ) {
843+ int doRead0 (Http2Frame frame , RecvByteBufAllocator .Handle allocHandle ) {
826844 pipeline ().fireChannelRead (frame );
827845 allocHandle .incMessagesRead (1 );
828846
829847 if (frame instanceof Http2DataFrame ) {
830848 final int numBytesToBeConsumed = ((Http2DataFrame ) frame ).initialFlowControlledBytes ();
831849 allocHandle .attemptedBytesRead (numBytesToBeConsumed );
832850 allocHandle .lastBytesRead (numBytesToBeConsumed );
833- if (numBytesToBeConsumed != 0 ) {
834- try {
835- if (consumeBytes (stream , numBytesToBeConsumed )) {
836- // We wrote some WINDOW_UPDATE frame, so we may need to do a flush.
837- writeDoneAndNoFlush = true ;
838- flush ();
839- }
840- } catch (Http2Exception e ) {
841- pipeline ().fireExceptionCaught (e );
842- }
843- }
851+ return numBytesToBeConsumed ;
844852 } else {
845853 allocHandle .attemptedBytesRead (MIN_HTTP2_FRAME_SIZE );
846854 allocHandle .lastBytesRead (MIN_HTTP2_FRAME_SIZE );
847855 }
856+ return 0 ;
848857 }
849858
850859 @ Override
@@ -1041,10 +1050,7 @@ protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
10411050 return promise ;
10421051 }
10431052
1044- protected abstract boolean consumeBytes (Http2FrameStream stream , int bytes ) throws Http2Exception ;
10451053 protected abstract boolean isParentReadInProgress ();
1046- protected abstract boolean streamMayHaveExisted (Http2FrameStream stream );
1047- protected abstract void tryRemoveChildChannelFromReadPendingQueue ();
1048- protected abstract boolean tryAddChildChannelToReadPendingQueue ();
1054+ protected abstract void addChannelToReadCompletePendingQueue ();
10491055 protected abstract ChannelHandlerContext parentContext ();
10501056}
0 commit comments