Skip to content

Commit 6b6475f

Browse files
iamalekseynormanmaurer
authored andcommitted
Prevent ByteToMessageDecoder from overreading when !isAutoRead (netty#9252)
Motivation: ByteToMessageDecoder only looks at the last channelRead() in the batch of channelRead()-s when determining whether or not it should call ChannelHandlerContext#read() to consume more data when !isAutoRead. This will lead to read() calls issued unnecessaily and unprompted if the very last channelRead() didn't result in at least one decoded message, even if there have been messages decoded from other channelRead()-s in the current batch. Modifications: Track decode outcomes for the entire batch of channelRead() calls and only issue a read in BTMD if the entire batch of channelRead() calls yielded no complete messages. Result: ByteToMessageDecoder will no longer overread when the very last read yielded no message, but the batch of reads did.
1 parent efe40ac commit 6b6475f

File tree

2 files changed

+66
-7
lines changed

2 files changed

+66
-7
lines changed

codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.buffer.ByteBufAllocator;
2222
import io.netty.buffer.CompositeByteBuf;
2323
import io.netty.buffer.Unpooled;
24+
import io.netty.channel.ChannelConfig;
2425
import io.netty.channel.ChannelHandlerContext;
2526
import io.netty.channel.ChannelInboundHandlerAdapter;
2627
import io.netty.channel.socket.ChannelInputShutdownEvent;
@@ -151,8 +152,14 @@ public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in)
151152
ByteBuf cumulation;
152153
private Cumulator cumulator = MERGE_CUMULATOR;
153154
private boolean singleDecode;
154-
private boolean decodeWasNull;
155155
private boolean first;
156+
157+
/**
158+
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
159+
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
160+
*/
161+
private boolean firedChannelRead;
162+
156163
/**
157164
* A bitmask where the bits are defined as
158165
* <ul>
@@ -291,7 +298,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
291298
}
292299

293300
int size = out.size();
294-
decodeWasNull = !out.insertSinceRecycled();
301+
firedChannelRead |= out.insertSinceRecycled();
295302
fireChannelRead(ctx, out, size);
296303
out.recycle();
297304
}
@@ -326,12 +333,10 @@ static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int
326333
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
327334
numReads = 0;
328335
discardSomeReadBytes();
329-
if (decodeWasNull) {
330-
decodeWasNull = false;
331-
if (!ctx.channel().config().isAutoRead()) {
332-
ctx.read();
333-
}
336+
if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
337+
ctx.read();
334338
}
339+
firedChannelRead = false;
335340
ctx.fireChannelReadComplete();
336341
}
337342

codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.buffer.UnpooledHeapByteBuf;
2323
import io.netty.channel.ChannelHandlerContext;
2424
import io.netty.channel.ChannelInboundHandlerAdapter;
25+
import io.netty.channel.ChannelOutboundHandlerAdapter;
2526
import io.netty.channel.embedded.EmbeddedChannel;
2627
import io.netty.util.internal.PlatformDependent;
2728
import org.junit.Test;
@@ -30,6 +31,7 @@
3031
import java.util.concurrent.BlockingQueue;
3132
import java.util.concurrent.LinkedBlockingDeque;
3233

34+
import static io.netty.buffer.Unpooled.wrappedBuffer;
3335
import static org.junit.Assert.*;
3436

3537
public class ByteToMessageDecoderTest {
@@ -345,4 +347,56 @@ public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer
345347
assertEquals(0, in.refCnt());
346348
}
347349
}
350+
351+
@Test
352+
public void testDoesNotOverRead() {
353+
class ReadInterceptingHandler extends ChannelOutboundHandlerAdapter {
354+
private int readsTriggered;
355+
356+
@Override
357+
public void read(ChannelHandlerContext ctx) throws Exception {
358+
readsTriggered++;
359+
super.read(ctx);
360+
}
361+
}
362+
ReadInterceptingHandler interceptor = new ReadInterceptingHandler();
363+
364+
EmbeddedChannel channel = new EmbeddedChannel();
365+
channel.config().setAutoRead(false);
366+
channel.pipeline().addLast(interceptor, new FixedLengthFrameDecoder(3));
367+
assertEquals(0, interceptor.readsTriggered);
368+
369+
// 0 complete frames, 1 partial frame: SHOULD trigger a read
370+
channel.writeInbound(wrappedBuffer(new byte[] { 0, 1 }));
371+
assertEquals(1, interceptor.readsTriggered);
372+
373+
// 2 complete frames, 0 partial frames: should NOT trigger a read
374+
channel.writeInbound(wrappedBuffer(new byte[] { 2 }), wrappedBuffer(new byte[] { 3, 4, 5 }));
375+
assertEquals(1, interceptor.readsTriggered);
376+
377+
// 1 complete frame, 1 partial frame: should NOT trigger a read
378+
channel.writeInbound(wrappedBuffer(new byte[] { 6, 7, 8 }), wrappedBuffer(new byte[] { 9 }));
379+
assertEquals(1, interceptor.readsTriggered);
380+
381+
// 1 complete frame, 1 partial frame: should NOT trigger a read
382+
channel.writeInbound(wrappedBuffer(new byte[] { 10, 11 }), wrappedBuffer(new byte[] { 12 }));
383+
assertEquals(1, interceptor.readsTriggered);
384+
385+
// 0 complete frames, 1 partial frame: SHOULD trigger a read
386+
channel.writeInbound(wrappedBuffer(new byte[] { 13 }));
387+
assertEquals(2, interceptor.readsTriggered);
388+
389+
// 1 complete frame, 0 partial frames: should NOT trigger a read
390+
channel.writeInbound(wrappedBuffer(new byte[] { 14 }));
391+
assertEquals(2, interceptor.readsTriggered);
392+
393+
for (int i = 0; i < 5; i++) {
394+
ByteBuf read = channel.readInbound();
395+
assertEquals(i * 3 + 0, read.getByte(0));
396+
assertEquals(i * 3 + 1, read.getByte(1));
397+
assertEquals(i * 3 + 2, read.getByte(2));
398+
read.release();
399+
}
400+
assertFalse(channel.finish());
401+
}
348402
}

0 commit comments

Comments
 (0)