1212 */
1313package org .asynchttpclient .providers .grizzly ;
1414
15- import org .asynchttpclient .Body ;
16- import org .asynchttpclient .BodyGenerator ;
1715import java .io .IOException ;
1816import java .nio .ByteBuffer ;
1917import java .util .Queue ;
2018import java .util .concurrent .ConcurrentLinkedQueue ;
19+ import java .util .concurrent .ExecutionException ;
2120import java .util .concurrent .atomic .AtomicInteger ;
21+
22+ import org .asynchttpclient .Body ;
23+ import org .asynchttpclient .BodyGenerator ;
2224import org .glassfish .grizzly .Buffer ;
25+ import org .glassfish .grizzly .Connection ;
26+ import org .glassfish .grizzly .WriteHandler ;
2327import org .glassfish .grizzly .filterchain .FilterChainContext ;
2428import org .glassfish .grizzly .http .HttpContent ;
2529import org .glassfish .grizzly .http .HttpRequestPacket ;
30+ import org .glassfish .grizzly .impl .FutureImpl ;
31+ import org .glassfish .grizzly .utils .Futures ;
32+
33+ import static java .lang .Boolean .TRUE ;
34+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
35+ import static org .glassfish .grizzly .utils .Exceptions .*;
2636
2737/**
2838 * {@link BodyGenerator} which may return just part of the payload at the time
@@ -38,51 +48,145 @@ public class FeedableBodyGenerator implements BodyGenerator {
3848
3949 private volatile HttpRequestPacket requestPacket ;
4050 private volatile FilterChainContext context ;
41-
51+ private volatile HttpContent .Builder contentBuilder ;
52+
53+ private final EmptyBody EMPTY_BODY = new EmptyBody ();
54+
55+
56+
57+ // ---------------------------------------------- Methods from BodyGenerator
58+
59+
4260 @ Override
4361 public Body createBody () throws IOException {
44- return new EmptyBody () ;
62+ return EMPTY_BODY ;
4563 }
46-
64+
65+
66+ // ---------------------------------------------------------- Public Methods
67+
68+
69+ /**
70+ * Feeds the specified buffer. This buffer may be queued to be sent later
71+ * or sent immediately. Note that this method may block if data is being
72+ * fed faster than it is being consumed by the peer.
73+ *
74+ * The maximum duration that this method may block is dependent on
75+ * the current value of {@link org.glassfish.grizzly.Transport#getWriteTimeout(java.util.concurrent.TimeUnit)}.
76+ * This value can be customized by using a {@link TransportCustomizer} to
77+ * fine-tune the transport used by the client instance.
78+ *
79+ * @param buffer the {@link Buffer} to feed.
80+ * @param last flag indicating if this is the final buffer of the message.
81+ * @throws IOException if an I/O error occurs.
82+ *
83+ * @see TransportCustomizer
84+ * @see GrizzlyAsyncHttpProviderConfig#addProperty(GrizzlyAsyncHttpProviderConfig.Property, Object)
85+ * @see GrizzlyAsyncHttpProviderConfig.Property#TRANSPORT_CUSTOMIZER
86+ */
4787 @ SuppressWarnings ("UnusedDeclaration" )
48- public void feed (final Buffer buffer , final boolean isLast )
49- throws IOException {
50- queue .offer (new BodyPart (buffer , isLast ));
88+ public void feed (final Buffer buffer , final boolean last )
89+ throws IOException {
90+ queue .offer (new BodyPart (buffer , last ));
5191 queueSize .incrementAndGet ();
5292
5393 if (context != null ) {
54- flushQueue ();
94+ flushQueue (true );
5595 }
5696 }
57-
97+
5898 public void initializeAsynchronousTransfer (final FilterChainContext context ,
59- final HttpRequestPacket requestPacket ) {
99+ final HttpRequestPacket requestPacket )
100+ throws IOException {
60101 this .context = context ;
61102 this .requestPacket = requestPacket ;
62- flushQueue ();
103+ this .contentBuilder = HttpContent .builder (requestPacket );
104+ // don't block here. If queue is full at the time of the next feed()
105+ // call, it will block.
106+ flushQueue (false );
63107 }
64108
109+
110+ // --------------------------------------------------------- Private Methods
111+
112+
65113 @ SuppressWarnings ("unchecked" )
66- private void flushQueue () {
114+ private void flushQueue (final boolean allowBlocking ) throws IOException {
67115 if (queueSize .get () > 0 ) {
68116 synchronized (this ) {
117+ final Connection c = context .getConnection ();
69118 while (queueSize .get () > 0 ) {
119+ if (allowBlocking ) {
120+ blockUntilQueueFree (c );
121+ }
70122 final BodyPart bodyPart = queue .poll ();
71123 queueSize .decrementAndGet ();
72124 final HttpContent content =
73- requestPacket .httpContentBuilder ()
74- .content (bodyPart .buffer )
75- .last (bodyPart .isLast )
125+ contentBuilder .content (bodyPart .buffer )
126+ .last (bodyPart .isLast )
76127 .build ();
77- context .write (content , ((!requestPacket .isCommitted ()) ?
78- context .getTransportContext ().getCompletionHandler () :
79- null ));
80-
128+ context .write (content ,
129+ ((!requestPacket .isCommitted ())
130+ ? context .getTransportContext ()
131+ .getCompletionHandler ()
132+ : null ));
81133 }
82134 }
83135 }
84136 }
85-
137+
138+ /**
139+ * This method will block if the async write queue is currently larger
140+ * than the configured maximum. The amount of time that this method
141+ * will block is dependent on the write timeout of the transport
142+ * associated with the specified connection.
143+ */
144+ private void blockUntilQueueFree (final Connection c ) {
145+ if (!c .canWrite ()) {
146+ final FutureImpl <Boolean > future =
147+ Futures .createSafeFuture ();
148+
149+ // Connection may be obtained by calling FilterChainContext.getConnection().
150+ c .notifyCanWrite (new WriteHandler () {
151+
152+ @ Override
153+ public void onWritePossible () throws Exception {
154+ future .result (TRUE );
155+ }
156+
157+ @ Override
158+ public void onError (Throwable t ) {
159+ future .failure (makeIOException (t ));
160+ }
161+ });
162+
163+ block (c , future );
164+ }
165+ }
166+
167+ private void block (final Connection c ,
168+ final FutureImpl <Boolean > future ) {
169+ try {
170+ final long writeTimeout =
171+ c .getTransport ().getWriteTimeout (MILLISECONDS );
172+ if (writeTimeout != -1 ) {
173+ future .get (writeTimeout , MILLISECONDS );
174+ } else {
175+ future .get ();
176+ }
177+ } catch (ExecutionException e ) {
178+ HttpTransactionContext httpCtx = HttpTransactionContext .get (c );
179+ httpCtx .abort (e .getCause ());
180+ } catch (Exception e ) {
181+ HttpTransactionContext httpCtx = HttpTransactionContext .get (c );
182+ httpCtx .abort (e );
183+ }
184+ }
185+
186+
187+ // ----------------------------------------------------------- Inner Classes
188+
189+
86190 private final class EmptyBody implements Body {
87191
88192 @ Override
@@ -100,9 +204,14 @@ public void close() throws IOException {
100204 context .completeAndRecycle ();
101205 context = null ;
102206 requestPacket = null ;
207+ contentBuilder = null ;
103208 }
104209 }
105-
210+
211+
212+ // ---------------------------------------------------------- Nested Classes
213+
214+
106215 private final static class BodyPart {
107216 private final boolean isLast ;
108217 private final Buffer buffer ;
0 commit comments