@@ -2374,59 +2374,68 @@ func testTransportStreamEndsWhileBodyIsBeingWritten(t testing.TB) {
23742374 rt .wantStatus (413 )
23752375}
23762376
2377- // https://golang.org/issue/15930
2378- func TestTransportFlowControl (t * testing.T ) {
2379- const bufLen = 64 << 10
2380- var total int64 = 100 << 20 // 100MB
2381- if testing .Short () {
2382- total = 10 << 20
2383- }
2384-
2385- var wrote int64 // updated atomically
2386- ts := newTestServer (t , func (w http.ResponseWriter , r * http.Request ) {
2387- b := make ([]byte , bufLen )
2388- for wrote < total {
2389- n , err := w .Write (b )
2390- atomic .AddInt64 (& wrote , int64 (n ))
2391- if err != nil {
2392- t .Errorf ("ResponseWriter.Write error: %v" , err )
2393- break
2394- }
2395- w .(http.Flusher ).Flush ()
2377+ func TestTransportFlowControl (t * testing.T ) { synctestTest (t , testTransportFlowControl ) }
2378+ func testTransportFlowControl (t testing.TB ) {
2379+ const maxBuffer = 64 << 10 // 64KiB
2380+ tc := newTestClientConn (t , func (tr * http.Transport ) {
2381+ tr .HTTP2 = & http.HTTP2Config {
2382+ MaxReceiveBufferPerConnection : maxBuffer ,
2383+ MaxReceiveBufferPerStream : maxBuffer ,
2384+ MaxReadFrameSize : 16 << 20 , // 16MiB
23962385 }
23972386 })
2387+ tc .greet ()
23982388
2399- tr := & Transport {TLSClientConfig : tlsConfigInsecure }
2400- defer tr .CloseIdleConnections ()
2401- req , err := http .NewRequest ("GET" , ts .URL , nil )
2402- if err != nil {
2403- t .Fatal ("NewRequest error:" , err )
2404- }
2405- resp , err := tr .RoundTrip (req )
2406- if err != nil {
2407- t .Fatal ("RoundTrip error:" , err )
2408- }
2409- defer resp .Body .Close ()
2389+ req , _ := http .NewRequest ("GET" , "https://dummy.tld/" , nil )
2390+ rt := tc .roundTrip (req )
2391+ tc .wantFrameType (FrameHeaders )
2392+
2393+ tc .writeHeaders (HeadersFrameParam {
2394+ StreamID : rt .streamID (),
2395+ EndHeaders : true ,
2396+ EndStream : false ,
2397+ BlockFragment : tc .makeHeaderBlockFragment (
2398+ ":status" , "200" ,
2399+ ),
2400+ })
2401+ rt .wantStatus (200 )
24102402
2411- var read int64
2412- b := make ([]byte , bufLen )
2403+ // Server fills up its transmit buffer.
2404+ // The client does not provide more flow control tokens,
2405+ // since the data hasn't been consumed by the user.
2406+ tc .writeData (rt .streamID (), false , make ([]byte , maxBuffer ))
2407+ tc .wantIdle ()
2408+
2409+ // User reads data from the response body.
2410+ // The client sends more flow control tokens.
2411+ resp := rt .response ()
2412+ if _ , err := io .ReadFull (resp .Body , make ([]byte , maxBuffer )); err != nil {
2413+ t .Fatalf ("io.Body.Read: %v" , err )
2414+ }
2415+ var connTokens , streamTokens uint32
24132416 for {
2414- n , err := resp . Body . Read ( b )
2415- if err == io . EOF {
2417+ f := tc . readFrame ( )
2418+ if f == nil {
24162419 break
24172420 }
2418- if err != nil {
2419- t .Fatal ("Read error:" , err )
2421+ wu , ok := f .(* WindowUpdateFrame )
2422+ if ! ok {
2423+ t .Fatalf ("received unexpected frame %T (want WINDOW_UPDATE)" , f )
24202424 }
2421- read += int64 (n )
2422-
2423- const max = transportDefaultStreamFlow
2424- if w := atomic .LoadInt64 (& wrote ); - max > read - w || read - w > max {
2425- t .Fatalf ("Too much data inflight: server wrote %v bytes but client only received %v" , w , read )
2425+ switch wu .StreamID {
2426+ case 0 :
2427+ connTokens += wu .Increment
2428+ case wu .StreamID :
2429+ streamTokens += wu .Increment
2430+ default :
2431+ t .Fatalf ("received unexpected WINDOW_UPDATE for stream %v" , wu .StreamID )
24262432 }
2427-
2428- // Let the server get ahead of the client.
2429- time .Sleep (1 * time .Millisecond )
2433+ }
2434+ if got , want := connTokens , uint32 (maxBuffer ); got != want {
2435+ t .Errorf ("transport provided %v bytes of connection WINDOW_UPDATE, want %v" , got , want )
2436+ }
2437+ if got , want := streamTokens , uint32 (maxBuffer ); got != want {
2438+ t .Errorf ("transport provided %v bytes of stream WINDOW_UPDATE, want %v" , got , want )
24302439 }
24312440}
24322441
0 commit comments